/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.plugin.datasource.hive;

import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;

import sun.security.krb5.Config;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import sun.security.krb5.Config;

public class HiveDataSourceClient extends CommonDataSourceClient {

  private static final Logger logger = LoggerFactory.getLogger(HiveDataSourceClient.class);

  private ScheduledExecutorService kerberosRenewalService;

  private Configuration hadoopConf;
  private UserGroupInformation ugi;
  private boolean retryGetConnection = true;

  public HiveDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
    super(baseConnectionParam, dbType);
  }

  @Override
  protected void preInit() {
    logger.info("PreInit in {}", getClass().getName());
    this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
  }

  @Override
  protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
    logger.info("Create Configuration for hive configuration.");
    this.hadoopConf = createHadoopConf();
    logger.info("Create Configuration success.");

    logger.info("Create UserGroupInformation.");
    this.ugi = createUserGroupInformation(baseConnectionParam.getUser());
    logger.info("Create ugi success.");

    this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
    this.jdbcTemplate = new JdbcTemplate(dataSource);
    logger.info("Init {} success.", getClass().getName());
  }

  @Override
  protected void checkEnv(BaseConnectionParam baseConnectionParam) {
    super.checkEnv(baseConnectionParam);
    checkKerberosEnv();
  }

  private void checkKerberosEnv() {
    String krb5File = PropertyUtils.getString(JAVA_SECURITY_KRB5_CONF_PATH);
    Boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
    if (kerberosStartupState && StringUtils.isNotBlank(krb5File)) {
      System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File);
      try {
        Config.refresh();
        Class<?> kerberosName = Class.forName("org.apache.hadoop.security.authentication.util.KerberosName");
        Field field = kerberosName.getDeclaredField("defaultRealm");
        field.setAccessible(true);
        field.set(null, Config.getInstance().getDefaultRealm());
      } catch (Exception e) {
        throw new RuntimeException("Update Kerberos environment failed.", e);
      }
    }
  }

  private UserGroupInformation createUserGroupInformation(String username) {
    String krb5File = PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH);
    String keytab = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH);
    String principal = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);

    try {
      UserGroupInformation ugi = CommonUtil.createUGI(getHadoopConf(), principal, keytab, krb5File, username);
      try {
        Field isKeytabField = ugi.getClass().getDeclaredField("isKeytab");
        isKeytabField.setAccessible(true);
        isKeytabField.set(ugi, true);
      } catch (NoSuchFieldException | IllegalAccessException e) {
        logger.warn(e.getMessage());
      }

      kerberosRenewalService.scheduleWithFixedDelay(() -> {
        try {
          ugi.checkTGTAndReloginFromKeytab();
        } catch (IOException e) {
          logger.error("Check TGT and Renewal from Keytab error", e);
        }
      }, 5, 5, TimeUnit.MINUTES);
      return ugi;
    } catch (IOException e) {
      throw new RuntimeException("createUserGroupInformation fail. ", e);
    }
  }

  protected Configuration createHadoopConf() {
    Configuration hadoopConf = new Configuration();
    hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed", true);
    return hadoopConf;
  }

  protected Configuration getHadoopConf() {
    return this.hadoopConf;
  }

  @Override
  public Connection getConnection() {
    try {
      return dataSource.getConnection();
    } catch (SQLException e) {
      boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
      if (retryGetConnection && kerberosStartupState) {
        retryGetConnection = false;
        createUserGroupInformation(baseConnectionParam.getUser());
        Connection connection = getConnection();
        retryGetConnection = true;
        return connection;
      }
      logger.error("get oneSessionDataSource Connection fail SQLException: {}", e.getMessage(), e);
      return null;
    }
  }

  @Override
  public void close() {
    try {
      super.close();
    } finally {
      kerberosRenewalService.shutdown();
      this.ugi = null;
    }
    logger.info("Closed Hive datasource client.");

  }
}
